问题背景
OP_WRITE事件是在Socket发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT时发生。正常情况下,都是可写的,因此一般不注册写事件。所以一般代码如下:
1 2 3 4 5 6
| while (bb.hasRemaining()) { int len = socketChannel.write(bb); if (len < 0) { throw new EOFException(); } }
|
这样在大部分情况都没问题,但是高并发,并且在网络环境很差的情况下,发送缓冲区可能会满,导致无限循环,这样最终会导致CPU利用率100%。下面就看看一些基于NIO的框架,是如何处理这个问题的。
Spymemcached的处理方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private void handleWrites(SelectionKey sk, MemcachedNode qa) throws IOException { qa.fillWriteBuffer(shouldOptimize); boolean canWriteMore = qa.getBytesRemainingToWrite() > 0; while (canWriteMore) { int wrote = qa.writeSome(); qa.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0; }
public final int writeSome() throws IOException { int wrote = channel.write(wbuf); toWrite -= wrote; return wrote; }
public final int getSelectionOps() { int rv = 0; if (getChannel().isConnected()) { if (hasReadOp()) { rv |= SelectionKey.OP_READ; } if (toWrite > 0 || hasWriteOp()) { rv |= SelectionKey.OP_WRITE; } } else { rv = SelectionKey.OP_CONNECT; } return rv; }
|
说明:Spymemcached是单线程的,因此就是绝对不能阻塞,所以当发现不可写的时候,不能阻塞住线程,而是立即返回,等待下次主线程循环来注册事件。
Netty的处理方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| protected void write0(AbstractNioChannel<?> channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel);
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); List<Throwable> causes = null;
synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { if (evt == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; channel.writeSuspended = false; break; } future = evt.getFuture();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { future = evt.getFuture(); buf = channel.currentWriteBuffer; }
long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } }
if (buf.finished()) { buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; evt = null; buf = null; future.setSuccess(); } else { addOpWrite = true; channel.writeSuspended = true;
if (writtenBytes > 0) { future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } } channel.inWriteNowLoop = false;
if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } }
|
说明:Netty是多线程的,因此其可以通过阻塞线程做一定的等待,等待通道可写。Netty等待是通过spinCount等待指定的循环次数。
Grizzly(诞生子Glass Fish项目)的处理方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public static long flushChannel(SocketChannel socketChannel, ByteBuffer bb, long writeTimeout) throws IOException { SelectionKey key = null; Selector writeSelector = null; int attempts = 0; int bytesProduced = 0; try { while (bb.hasRemaining()) { int len = socketChannel.write(bb); attempts++; if (len < 0) { throw new EOFException(); } bytesProduced += len; if (len == 0) { if (writeSelector == null) { writeSelector = SelectorFactory.getSelector(); if (writeSelector == null) { continue; } } key = socketChannel.register(writeSelector, key.OP_WRITE); if (writeSelector.select(writeTimeout) == 0) { if (attempts > 2) throw new IOException("Client disconnected"); } else { attempts--; } } else { attempts = 0; } } } return bytesProduced; }
|
说明:Grizzly是多线程的,因此其可以做合适的阻塞等待。其没有再主selector上注册写事件,而是在重新构造的selector上注册写事件,并且通过select()来阻塞一定的时间来等待可写。
为什么要这么做呢?Grizzly的作者对此的回应如下:
- 使用临时的Selector的目的是减少线程间的切换。当前的Selector一般用来处理OP_ACCEPT,和OP_READ的操作。使用临时的Selector可减轻主Selector的负担;而在注册的时候则需要进行线程切换,会引起不必要的系统调用。这种方式避免了线程之间的频繁切换,有利于系统的性能提高。
- 虽然writeSelector.select(writeTimeout)做了阻塞操作,但是这种情况只是少数极端的环境下才会发生。> 大多数的客户端是不会频繁出现这种现象的,因此在同一时刻被阻塞的线程不会很多。
- 利用这个阻塞操作来判断异常中断的客户连接。
- 经过压力实验证明这种实现的性能是非常好的。